View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: AbstractTopicConsumerEndpoint.java,v 1.3 2007/01/24 12:00:28 tanderson Exp $
44   */
45  
46  package org.exolab.jms.messagemgr;
47  
48  import java.util.Collections;
49  import java.util.HashMap;
50  import java.util.Map;
51  import javax.jms.InvalidSelectorException;
52  import javax.jms.JMSException;
53  
54  import org.apache.commons.logging.Log;
55  import org.apache.commons.logging.LogFactory;
56  
57  import org.exolab.jms.client.JmsDestination;
58  import org.exolab.jms.client.JmsTopic;
59  import org.exolab.jms.message.MessageImpl;
60  import org.exolab.jms.persistence.PersistenceException;
61  import org.exolab.jms.server.ServerConnection;
62  
63  
64  /***
65   * A {@link ConsumerEndpoint} for topics.
66   *
67   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
68   * @version $Revision: 1.3 $ $Date: 2007/01/24 12:00:28 $
69   */
70  abstract class AbstractTopicConsumerEndpoint extends AbstractConsumerEndpoint
71          implements DestinationEventListener {
72  
73      /***
74       * The identity of the connection that owns this consumer, or
75       * <code>-1</code> if this consumer isn't currently associated with a
76       * connection.
77       */
78      private long _connectionId;
79  
80      /***
81       * The destination manager.
82       */
83      private final DestinationManager _destinations;
84  
85      /***
86       * Cache of all handles for this consumer.
87       */
88      private MessageQueue _handles = new MessageQueue();
89  
90      /***
91       * Maintains a map of TopicDestinationCache that this endpoint subscribes
92       * to, keyed on JmsTopic. A wildcard subscription may point to more than
93       * one.
94       */
95      protected Map _caches = Collections.synchronizedMap(new HashMap());
96  
97      /***
98       * The logger.
99       */
100     private static final Log _log =
101             LogFactory.getLog(AbstractTopicConsumerEndpoint.class);
102 
103 
104     /***
105      * Construct a new <code>TopicConsumerEndpoint</code>.
106      * <p/>
107      * The destination and selector determine where it will be sourcing its
108      * messages from, and scheduler is used to asynchronously deliver messages
109      * to the consumer.
110      *
111      * @param consumerId   the identity of this consumer
112      * @param connectionId the identity of the connection that owns this
113      *                     consumer
114      * @param topic        the topic(s) to access. May be a wildcarded topic.
115      * @param selector     the message selector. May be <code>null</code>
116      * @param noLocal      if true, inhibits the delivery of messages published
117      *                     by its own connection.
118      * @param destinations the destination manager
119      * @throws InvalidSelectorException if the selector is invalid
120      * @throws JMSException             if the destination caches can't be
121      *                                  constructed
122      */
123     public AbstractTopicConsumerEndpoint(long consumerId, long connectionId,
124                                          JmsTopic topic,
125                                          String selector, boolean noLocal,
126                                          DestinationManager destinations)
127             throws JMSException {
128         super(consumerId, topic, selector, noLocal);
129         _connectionId = connectionId;
130         _destinations = destinations;
131     }
132 
133     /***
134      * Returns the identity of the connection that owns this consumer.
135      *
136      * @return the identity of the connection, or <code>-1</code> if this is not
137      *         currently associated with a connection.
138      * @see ServerConnection#getConnectionId
139      */
140     public long getConnectionId() {
141         return _connectionId;
142     }
143 
144     /***
145      * Determines if this consumer can consume messages from the specified
146      * destination.
147      *
148      * @param destination the destination
149      * @return <code>true</code> if the consumer can consume messages from
150      *         <code>destination</code>; otherwise <code>false</code>
151      */
152     public boolean canConsume(JmsDestination destination) {
153         boolean result = false;
154         if (destination instanceof JmsTopic) {
155             JmsTopic topic = (JmsTopic) getDestination();
156             if (!topic.isWildCard()) {
157                 result = super.canConsume(destination);
158             } else {
159                 result = topic.match((JmsTopic) destination);
160             }
161         }
162         return result;
163     }
164 
165     /***
166      * Return a delivered, but unacknowledged message to the cache.
167      *
168      * @param handle the handle of the message to return
169      */
170     public void returnMessage(MessageHandle handle) {
171         addMessage(handle);
172     }
173 
174     /***
175      * Return the number of unsent messages in the cache for this consumer.
176      *
177      * @return the number of unsent messages
178      */
179     public int getMessageCount() {
180         return _handles.size();
181     }
182 
183     /***
184      * This event is called when a non-persistent message is added to the
185      * <code>DestinationCache</code>.
186      *
187      * @param handle  a handle to the message
188      * @param message the added message
189      * @return <code>true</code> if the listener accepted the message; otherwise
190      *         <code>false</ode>
191      * @throws JMSException if the listener fails to handle the message
192      */
193     public boolean messageAdded(MessageHandle handle, MessageImpl message)
194             throws JMSException {
195         boolean accepted = true;
196 
197         // if the 'noLocal' indicator is set, and the message arrived on
198         // the same connection, ignore the message
199         if (getNoLocal() && message.getConnectionId() == getConnectionId()) {
200             accepted = false;
201         } else {
202             // create a message handle for this consumer
203             handle = new TopicConsumerMessageHandle(handle, this);
204 
205             if (!_handles.contains(handle)) {
206                 // if the message is not already in the cache then add it
207                 addMessage(handle);
208             } else {
209                 accepted = false;
210                 _log.warn("Endpoint=" + this + " already has message cached: " +
211                           handle);
212             }
213         }
214         return accepted;
215     }
216 
217     /***
218      * This event is called when a message is removed from the
219      * <code>DestinationCache</code>.
220      *
221      * @param messageId the identifier of the removed message
222      * @throws JMSException if the listener fails to handle the message
223      */
224     public void messageRemoved(String messageId) throws JMSException {
225         MessageHandle handle = _handles.remove(messageId);
226         if (handle != null) {
227             handle.destroy();
228         }
229     }
230 
231     /***
232      * This event is called when a persistent message is added to the
233      * <code>DestinationCache</code>.
234      *
235      * @param handle  a handle to the added message
236      * @param message the added message
237      * @return <code>true</code> if the listener accepted the message;
238      * @throws JMSException         if the listener fails to handle the message
239      * @throws PersistenceException if there is a persistence related problem
240      */
241     public boolean persistentMessageAdded(MessageHandle handle,
242                                           MessageImpl message)
243             throws JMSException, PersistenceException {
244         boolean accepted = true;
245 
246         // if the 'noLocal' indicator is set, and the message arrived on
247         // the same connection, ignore the message
248         if (getNoLocal() && message.getConnectionId() == getConnectionId()) {
249             accepted = false;
250         } else {
251             // create a message handle for this consumer
252             handle = new TopicConsumerMessageHandle(handle, this);
253             if (isPersistent()) {
254                 // and make it persistent if this is a durable consumer
255                 handle.add();
256             }
257 
258             if (!_handles.contains(handle)) {
259                 // if the message is not already in the cache then add it
260                 addMessage(handle);
261             } else {
262                 accepted = false;
263                 _log.warn("Endpoint=" + this + " already has message cached: " +
264                           handle);
265             }
266         }
267         return accepted;
268     }
269 
270     /***
271      * This event is called when a message is removed from the
272      * <code>DestinationCache</code>.
273      *
274      * @param messageId the identifier of the removed message
275      * @throws JMSException         if the listener fails to handle the message
276      * @throws PersistenceException if there is a persistence related problem
277      */
278     public void persistentMessageRemoved(String messageId)
279             throws JMSException, PersistenceException {
280         MessageHandle handle = _handles.remove(messageId);
281         if (handle != null) {
282             handle.destroy();
283         }
284     }
285 
286 
287     /***
288      * Invoked when a destination is created.
289      *
290      * @param destination the destination that was added
291      */
292     public void destinationAdded(JmsDestination destination) {
293         // no-op
294     }
295 
296     /***
297      * Invoked when a destination is removed.
298      *
299      * @param destination the destination that was removed
300      */
301     public void destinationRemoved(JmsDestination destination) {
302         // no-op
303     }
304 
305     /***
306      * Invoked when a message cache is created.
307      *
308      * @param destination the destination that messages are being cached for
309      * @param cache       the corresponding cache
310      */
311     public void cacheAdded(JmsDestination destination,
312                            DestinationCache cache) {
313         if (destination instanceof JmsTopic) {
314             JmsTopic myTopic = (JmsTopic) getDestination();
315             JmsTopic topic = (JmsTopic) destination;
316             if (myTopic.match(topic) && !_caches.containsKey(topic)) {
317                 _caches.put(topic, cache);
318                 cache.addConsumer(this);
319             }
320         }
321     }
322 
323     /***
324      * Invoked when a message cache is removed.
325      *
326      * @param destination the destination that messages are no longer being
327      *                    cached for
328      * @param cache       the corresponding cache
329      */
330     public void cacheRemoved(JmsDestination destination,
331                              DestinationCache cache) {
332         if (destination instanceof JmsTopic) {
333             _caches.remove(destination);
334         }
335     }
336 
337     /***
338      * Registers this with the associated {@link DestinationCache}s. The
339      * consumer may receive messages immediately.
340      *
341      * @throws JMSException for any JMS error
342      */
343     protected void init() throws JMSException {
344         JmsTopic topic = (JmsTopic) getDestination();
345 
346         // register the endpoint with the destination
347         if (topic.isWildCard()) {
348             // if the topic is a wild card then we need to retrieve a
349             // set of matching destination caches.
350             _caches = _destinations.getTopicDestinationCaches(topic);
351             // for each cache register this endpoint as a consumer of
352             // it's messages. Before doing so register as a destination
353             // event listener with the DestinationManager
354             _destinations.addDestinationEventListener(this);
355             DestinationCache[] caches = getDestinationCaches();
356             for (int i = 0; i < caches.length; ++i) {
357                 caches[i].addConsumer(this);
358             }
359         } else {
360             // if the topic is not a wildcard then we need to get the
361             // destination cache. If one does not exist then we need to
362             // create it.
363             DestinationCache cache = _destinations.getDestinationCache(topic);
364             _caches.put(topic, cache);
365             cache.addConsumer(this);
366         }
367     }
368 
369     /***
370      * Set the connection identifier.
371      *
372      * @param connectionId the identity of the connection that owns this
373      *                     consumer
374      * @see #getConnectionId
375      */
376     protected void setConnectionId(long connectionId) {
377         _connectionId = connectionId;
378     }
379 
380     /***
381      * Add the handle to the cache.
382      *
383      * @param handle the message handle to add
384      */
385     protected void addMessage(MessageHandle handle) {
386         _handles.add(handle);
387         notifyMessageAvailable();
388     }
389 
390     /***
391      * Return the next available message to the client.
392      *
393      * @return the next message, or <code>null</code> if none is available
394      * @throws JMSException for any error
395      * @param cancel
396      */
397     protected MessageHandle doReceive(Condition cancel) throws JMSException {
398         MessageHandle result = null;
399         MessageHandle handle;
400         while (!cancel.get() && (handle = _handles.removeFirst()) != null) {
401             if (_log.isDebugEnabled()) {
402                 _log.debug("doReceive() - next available=" + handle.getMessageId());
403             }
404             // ensure that the message still exists
405             MessageImpl message = handle.getMessage();
406             if (message != null) {
407                 if (selects(message)) {
408                     // got a message which is applicable for the endpoint
409                     result = handle;
410                     break;
411                 } else {
412                     // message has been filtered out so destroy the handle.
413                     handle.destroy();
414                 }
415             }
416         }
417         if (_log.isDebugEnabled()) {
418             _log.debug("doReceive() - result=" + (result != null ? result.getMessageId() : null));
419         }
420         return result;
421     }
422 
423     /***
424      * Closes this endpoint.
425      */
426     protected void doClose() {
427         // unregister as a destination event listener
428         _destinations.removeDestinationEventListener(this);
429 
430         // unregister from the destination before continuing
431         DestinationCache[] caches = getDestinationCaches();
432         for (int i = 0; i < caches.length; ++i) {
433             caches[i].removeConsumer(this);
434         }
435         _caches.clear();
436 
437         if (!isPersistent()) {
438             // for non-persistent consumers, destroy all outstanding message
439             // handles
440             MessageHandle[] handles = _handles.toArray();
441             for (int i = 0; i < handles.length; ++i) {
442                 MessageHandle handle = handles[i];
443                 try {
444                     handle.destroy();
445                 } catch (JMSException exception) {
446                     _log.error(exception, exception);
447                 }
448             }
449         }
450     }
451 
452     /***
453      * Returns the destination manager.
454      *
455      * @return the destination manager
456      */
457     protected DestinationManager getDestinationManager() {
458         return _destinations;
459     }
460 
461     /***
462      * Returns the destination caches.
463      *
464      * @return the destination caches
465      */
466     protected DestinationCache[] getDestinationCaches() {
467         return (DestinationCache[]) _caches.values().toArray(
468                 new DestinationCache[0]);
469     }
470 
471 }